Amazon Elastic MapReduce入門 〜 Apache Mahoutでレコメンデーション!
よく訓練されたアップル信者、都元です。Hadoop使ってますか。試しにHadoopを使ってみよう、と思った時に主に障害となるのが以下の3つです。
- Hadoopのクラスタを組むために実機を複数用意するのが厄介。それをクラスタとして組み上げるのも厄介。
- Hadoopの上で動かすアプリケーションをMapReduceで書くのが厄介。
- Hadoopで処理するほどのビッグデータを用意するのが厄介。
1つ目はAmazon Elastic MapReduce (EMR)を使う事でスマートに解決しましょう。
2つ目については、オープンソースのMapReduceアプリケーションを使います。私が強い興味を持っている分野に「機械学習」というものがあります。機械学習とは、コンピュータにデータを分析させ、未知の情報についての予測をさせたり、人間の知能に近い機能を実現しようという試みです。今回は、この機械学習の各種アルゴリズムのOSS実装であるApache Mahoutを利用します。つまり、今回はコーディング不要です。
3つ目については本エントリでは匿名化して公開されているpublicなデータセットBook-Crossing Datasetを利用します。
レコメンデーション
Amazon.co.jpでの「この本を買った人は、あの本も買っています」という表示や、マイストア等の機能は、大勢のユーザの購買履歴に基づいて、各ユーザが興味を持ちそうな製品を算出しています。また、Facebookの「もしかして、この人と知り合いではありませんか?」というのも、機械学習によるものです。
機械学習の一種として「レコメンデーション(推薦)」というものがあります。レコメンデーションのアルゴリズムの1つが「協調フィルタリング」という手法です。大量の「誰が(ユーザ)」「何を(アイテム)」「どの程度好きか(レーティング)」というデータを分析して、レコメンド対象の人物(A)と似ている嗜好を持つ別の人物(B)を探します。その時、Bさんが高いレーティングをつけているアイテムは、Aさんに対してもお薦めできる、という理屈です。
Amazon様は「Chef勉強しなきゃと思ってるでしょ」とか「もしかしてRaspberry Pi買ってない?」とか「そういえば子供いたよね」とか、都元のことはだいたいお見通しなわけです。
MapReduceとHadoop
上の例のように、機械学習というのは計算に基づいて結果を出すわけですが、その基礎となるデータが多ければ多いほど、確からしい結果を出してくれます。が、しかし、データが多ければ多いほど、指数的に計算量が増加する傾向があります。近年、このような大量データ処理のニーズは高まっています。いわゆるビッグデータという奴です。
精度は上げたいが、計算に何日も掛かるようでは使い物になりません。とは言え、計算のスピードアップというのは結構大変なのはご存知の通りです。例えば、逐次処理をしているバッチアプリケーションがあり、この処理を1回実行するのに1台のマシンで10時間掛かるとします。そこでユーザから「マシンは10台…いや、100台用意するから、1時間で終わるようにしてくれないか」と言われたとします。プログラムに非効率な部分があれば、そこを改善することで劇的なスピードアップが可能かもしれませんが、特にそういった欠陥が無かった場合、かなり困るケースが多いのではないでしょうか。
そこで、プログラミングのパラダイムを変えることによって、コンピュータリソースを投入すれば相応の時間の短縮が望める仕組みが考案されました。これがMapReduceです。そして、MapReduceパラダイムで実装されたプログラムを実行する基盤がApache Hadoopです。
Mahoutには、MapReduceパラダイムで実装されたレコメンデーションのアルゴリズムが含まれていますので、今回はそれを利用することにします。
Amazon Elastic MapReduce
Hadoopは、複数台のノード(要するにマシン)でクラスタを組み、相互に協調しながら計算をします。ノードの数を増やせば計算は速くなりますし、減らせば遅くなります。
前述のように、通常Hadoopを利用する際は、複数台のマシンを用意し、OSとネットワークを整備し、Hadoopをセットアップし、1つのクラスタとして組み上げなければなりません。しかし今回のような「ちょっと試してみよう」というレベルで実機のクラスタを組めるような時間と気力はありません。
そういった環境づくりが得意なのは、まさにAWSですね。EMRは、セットアップ済みのHadoopクラスタを指定台数で組み上げて、MapReduceの実行基盤を提供してくれるサービスです。利用にかかる費用としては、入出力用のS3に掛かる料金+Hadoopノードとして動くEC2に掛かるの料金+EMRの料金です。EMRの料金は、EC2インスタンス費用の25%程度と考えると良いと思います。普段EC2を使う感覚の2割5分増しのイメージです。
チュートリアル
では早速、EMR上でMahoutのレコメンデーションを実行してみましょう。その前提として。今回のEMRの試行はus-east-1リージョンを利用することにします。
まずは処理対象のレーティングデータが必要です。今回はBook-Crossing Datasetという匿名化されたレーティングデータを利用します。このデータセットは約27万人による、約27万種類の書籍に対する、約100万件のレーティング情報です。正直、たった100万件のデータをMapReduceで処理するのは「大根を正宗で切る」ようなもの *1ですが、とりあえずチュートリアルということでご勘弁を。Hadoopが必須になるほどの大量の意味のあるデータ、というのは広いインターネットの世界にも残念ながらそうそう落ちていません。かといって、ランダムに値を生成したのでは、機械学習にかける面白みも半減してしまいます。もし面白いビッグデータがあったら、是非教えてください。
データの加工
まず、Book-Crossing Datasetから「CSV Dump」という方のzipファイルをダウンロードし、展開します。BX-Books.csvBX-Users.csvBX-Book-Ratings.csvという3つのファイルができあがります。それぞれ、アイテムマスタ、ユーザマスタ、レーティングのデータを含んでいます。中身を覗いてみると、CSVと言いながら、カンマ区切り(comma separated)ではなく、セミコロン区切り(semicolon separated)なのが謎ですが。まぁこれらのデータはそのままMahoutに突っ込むことはできません。
Mahoutが必要とする形式はユーザID(long),アイテムID(long),レーティング(float)という形式のCSV (comma separated values)です。これらのファイルを生成するため、以下のpythonスクリプトを実行してください。
#! /usr/bin/python import sys import csv import collections def main(argv): idx = create_book_id_index() ratings = csv.reader(open('BX-Book-Ratings.csv', 'rb'), delimiter=';', quotechar="\"") out = open('Ratings.csv', 'w') for row in ratings: try: out.write('%s,%d,%s\n' % (row[0], idx[row[1]], row[2])) except KeyError: pass out.close() def create_book_id_index(): books = csv.reader(open('BX-Books.csv', 'rb'), delimiter=';', quotechar="\"") book_index = open('Book-ID-Index.csv', 'w') idx = {} counter = -1 for row in books: counter += 1 if counter == 0: continue book_index.write('%d,%s\n' % (counter, row[0])) idx[row[0]] = counter book_index.close() return idx if __name__ == '__main__': main(sys.argv)
実行が終わると、カレントディレクトリにBook-ID-Index.csvとRatings.csvというファイルが出来上がるはずです。前者は、書籍のISBNとlong型のIDの対応表です。Mahoutは整数値によるアイテムIDを要求しますが、Book-Crossing Datasetでは、書籍のIDをISBNで扱っているので、その対応付けのための情報 *2になります。
後者のRatings.csvがMahoutに処理させるデータです。中身は以下のような形式になっているはずです。
276725,2967,0 276726,225830,5 276727,11055,0 ...
S3にデータとMahoutをアップロードする
さて、EMRでMahoutを利用するために、上で作成したRatings.csv及びMahoutのJobプログラムを Amazon S3 にアップロードします。Mahout本体は公式サイトからDLしておきましょう。本チュートリアルでは v0.7 を利用しています。以下のような名前のファイル(どちらか1つでOK)が目的の本体です。このファイルを展開した中にあるmahout-core-0.7-job.jarを今回利用します。
- mahout-distribution-0.7.tar.gz
- mahout-distribution-0.7.zip
新しいS3 bucketを作ります。リージョンは US Standard ですね。バケットの名前はここではcm-emr-tutorialとしますが、同じ名前のバケットは作れないため、各自別の名前でバケットを作成してください。また、以下の手順で出てくるcm-emr-tutorialという部分は、ここで作ったバケット名に置き換えて読んでください。
bucketを作成したら、先ほどのファイルを/input-bx/Ratings.csvと/mahout/mahout-core-0.7-job.jarとして、アップロードします。
レコメンドの計算処理を起動する(jobflowの起動)
いよいよ、レコメンドの処理を実行します。
ユーザは、Elastic MapReduceの処理を「jobflow」という単位で起動します。Elastic MapReduceは、ユーザからjobflowを介して処理の指示を受け取り、一連の計算処理を実行し、そして進捗をユーザに提示します。jobflowは、処理の流れの面から見ると1つ以上の「step」から成っており、また、リソース面から見ると、1つ以上のEC2インスタンスから成っています。stepについては、後述します。
ここでは4台体制のHadoopクラスタ(Master Instance 1台+Core Instance 3台)でのレコメンドを行います。また、起動するEC2インスタンスのタイプは全て Small (m1.small) とします。また、jobflowには「バッチjobflow」と「インタラクティブjobflow」の2種類がありますが、ここでは前者を利用します。
まずEMRのManagementConsoleに行き、Create New Job Flowボタンをクリックします。
jobflowの名前(任意、ここではhello-mahout)を入力し、Hadoop Versionはデフォルトのままの「Hadoop 1.0.3」、jobflowのタイプは「Custom JAR」を選択します。
続いて、実行する処理について、Mahoutの所在と起動引数を設定します。以下、コピペ用。bucket名を変えるのを忘れないようにしてくださいね。
- JAR Location
-
cm-emr-tutorial/mahout/mahout-core-0.7-job.jar
- JAR Arguments
-
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -Dmapred.map.tasks=40 -Dmapred.reduce.tasks=8 -Dmapred.input.dir=s3n://cm-emr-tutorial/input-bx -Dmapred.output.dir=s3n://cm-emr-tutorial/output-bx --numRecommendations 100 --similarityClassname SIMILARITY_PEARSON_CORRELATION
JAR Argumentsにおいて、mapred.output.dirというプロパティで、計算結果の出力先を指定しています。EMRの実行に先立っては、毎回、このロケーションにファイルが既に存在しないことを充分に確認してください。以前の計算結果などが存在すると、Hadoopは決してデータを上書きせず、エラー終了します。MapReduceの計算結果は、通常よりも高い計算コストを掛けて導きだした結果であるため、簡単に消えないような仕様になっているのです。
次に、起動するEC2インスタンスのタイプや数を設定します。基本的には以下の通り。今回はCore Instanceの3台については、EC2のスポットインスタンスを指定してみました。数が完全に3つ揃わなくても大きな問題はないですからね。
次のページではログ置き場の設定等。後ほど、jobflowの進捗をオンラインで確認するため、Debugは有効にしておきましょう。
次のページではBootstrap Actionというものを設定します。ここでは「Configure your Bootstrap Actions」を選択して、Action Typeを「Memory Intensive Configuration」(メモリ集約型)に設定して下さい。その他はデフォルトのままでOKです。
EMRのAMIは最新バージョン(執筆時点で2.3.3)を利用するため、ドキュメントによるとこの指定は不要なはずです。むしろ、あるとjobflowが失敗する可能性がある、と書いてあります。しかし、この指定をしなかった時、処理の途中でOutOfMemoryErrorが発生して落ちてしまいました。本来は起動のパラメータ等、別のアプローチを色々検討する必要がありそうですが、ひとまずこのBootstrap Actionで回避できた、という状況です。情報がありましたらコメント等頂ければ嬉しいです。
最後に、設定内容をレビューし、jobflow作成を完了させます。
レコメンドの計算処理を実行する(jobflowの確認)
以上でjobflowが作成され、計算が始まります。開始から5分程度は、HadoopノードとしてのEC2インスタンスを立ち上げたり、クラスタを構築する時間に費やされます。
しばらくすると、EC2インスタンスが数台(ここでは1+3なので4台)立ち上がってくるはずです。SecurityGroupを見ると、1つがmasterで3つがslaveなのが分かると思います。
しばらく時間が掛かるので、退屈しのぎに色々見て回ってみましょう。jobflowの情報を見てみると、起動時に設定した内容を確認できます。
jobflowを選択し、上にあるデバッグボタンをクリックすると、進捗を確認できます。
jobflowが複数の「step」から成ることは前述の通りですが、本エントリにおけるEMRによるMahoutの分散レコメンドは、2つのstepから成っています。1つ目のstepは、EMRのデバッグ機能を有効にするための処理で、ほとんどの場合1分以内に完了します。2つ目のstepが、Mahout本体の実行です。
このstepは、10個の「job」から成っています。jobは、1組のMapReduce処理を表し、複数のmapとreduceから成っていますます。つまり、Mahoutの分散レコメンドは、MapReduceを10回繰り返すことによって最終結果を得る仕組みになっているということです。(下記スナップショットは3つ目のjob実行中に撮ったものですので、まだ3つしか見えていません。)
さらにjobは個々のmap処理やreduce処理を表す「task」から成り立っています。
さらにtaskは「task attempt」から成っていますが、大抵は1つのtaskについてtask attemptは1つです。このattemptは、Hadoopのノード異常で処理に失敗した場合 *3等、同じタスクを再試行した際に複数になります。
さて、jobflowの起動からしばらくすると(筆者の試行の結果は23分でした)jobflowのステータスがCOMPLETEDに変わります。
EC2インスタンスも自動的にterminalteされ、無駄な課金は発生しません。
MapReduceの実行に掛かる課金は、1時間未満の利用は1時間に切り上げての課金となりますので、インスタンスの数とタイプは「1時間弱で完了する」構成が、最もコストを抑えられる可能性があります。EMRで繰り返し計算を実行する場合は、プリファレンスデータの量や質、EC2インスタンスのタイプと数、map及びreduceタスクの分割数(mapred.map.tasks及びmapred.reduce.tasksの値)など、コストに影響を及ぼすパラメータと、最終的なコストを記録し、将来の計算における構成の選択に活かす *4と良いでしょう。
結果の確認
実行した結果は、入力データと同様に、S3に出力されます。起動オプション-Dmapred.output.dirで指定したs3n://cm-emr-tutorial/output-bxつまりcm-emr-tutorialバケットのoutput-bxディレクトリです。このディレクトリ内に、結果がテキストファイルとして出力されています。
出力は一般的に、複数のファイルに分割されています。筆者が試した環境ではpart-r-00000〜part-r-00007という8個のファイルが生成されていました。例えば、part-r-00006の冒頭数行を以下に示します。
254 [92522:9.630823,92525:9.630823,8626:9.630823,65999:9.630823,59718:9.630823,37149:9.630823,27791:9.630823,59961:9.630823,92545:9.630823,90822:9.617716,19446:9.554597,1828:9.518214,1195:9.482219,2528:9.479742,34218:9.444213,38292:9.444213,50616:9.444213,63646:9.444213,17846:9.444213,7137:9.444213,19806:9.444213,70702:9.444213,116393:9.444213,36198:9.444213,100614:9.444213,109022:9.444213,11252:9.444213,40924:9.444213,87972:9.444213,30672:9.444213,165805:9.444213,18838:9.444213,154243:9.444213,162873:9.444213,132388:9.444213,54056:9.444213,4432:9.444213,4559:9.444213,33645:9.444213,119874:9.444213,6548:9.444213,18982:9.444213,1782:9.444213,63606:9.444213,112273:9.444213,22548:9.444213,24791:9.444213,5223:9.42379,105357:9.421428,19680:9.410272,4314:9.409723,30612:9.402454,19023:9.399353,29799:9.39101,13463:9.39101,16156:9.39101,17962:9.39101,97795:9.39101,6206:9.365187,8159:9.35637,175361:9.3394375,92528:9.337471,92478:9.329936,7683:9.310755,4480:9.30896,92506:9.295485,92516:9.295485,92541:9.295485,27443:9.28304,168013:9.239556,92496:9.239472,74915:9.239472,409:9.210344,80439:9.198485,90566:9.170435,106912:9.1609955,39905:9.1609955,5594:9.1609955,154090:9.1609955,4266:9.1609955,70146:9.1609955,5739:9.1609955,13444:9.1609955,25364:9.1609955,30498:9.1609955,71936:9.1609955,81045:9.1609955,76542:9.1609955,26778:9.1609955,37130:9.1609955,29710:9.158854,29709:9.158854,105427:9.140233,7974:9.130101,15513:9.0,13344:9.0,83989:9.0,102821:9.0,15509:9.0,102667:9.0] 446 [51424:10.0,2949:9.025401] 638 [757:10.0,114739:10.0,35643:10.0,20040:10.0,79628:10.0,8988:10.0,65690:10.0,30598:10.0,212176:10.0,11921:10.0,19449:10.0,39485:10.0,84601:10.0,190427:10.0,20649:10.0,57296:10.0,21379:10.0,46708:10.0,38909:10.0,18296:10.0,6039:10.0,103513:10.0,51705:10.0,2123:10.0,29721:10.0,14931:10.0,34979:10.0,31643:10.0,90675:10.0,8794:10.0,34192:10.0,7674:10.0,2103:10.0,126859:10.0,41080:10.0,90647:10.0,59454:10.0,4295:10.0,72243:10.0,5401:10.0,93078:10.0,44387:10.0,78357:10.0,72227:10.0,108987:10.0,52036:10.0,19863:10.0,40470:10.0,7606:10.0,92816:10.0,33761:10.0,53254:10.0,75530:10.0,2004:10.0,62918:10.0,1432:10.0,102798:10.0,71403:10.0,127834:10.0,77629:10.0,77144:10.0,53183:10.0,1378:10.0,11912:10.0,94944:10.0,93268:10.0,93259:10.0,792:10.0,17500:10.0,789:10.0,788:10.0,784:10.0,782:10.0,780:10.0,778:10.0,772:10.0,22488:10.0,763:10.0,762:10.0,76513:10.0,7441:10.0,754:10.0,85417:10.0,134432:10.0,17461:10.0,750:10.0,18016:10.0,82627:10.0,102564:10.0,53088:10.0,93183:10.0,93182:10.0,74225:10.0,93155:10.0,93151:10.0,50197:10.0,151071:10.0,246872:10.0,53034:10.0,14598:10.0] 726 [17088:7.2081723] 758 [162526:9.419243] 1254 [14512:10.0,18890:10.0,14505:10.0,84628:10.0,27220:10.0,9673:10.0,84603:10.0,26370:10.0,73905:10.0,200903:10.0,26346:10.0,8253:10.0,49173:10.0,8931:10.0,141270:10.0] 1790 [157101:6.5261474] 2030 [1685:10.0,2790:10.0,51789:10.0,6318:10.0,169305:10.0,6934:10.0,81292:10.0,9200:10.0,16116:10.0,96873:10.0,50079:10.0,14987:10.0,169395:10.0,1058:10.0,16639:10.0,9407:10.0,5507:10.0,39479:10.0,36136:10.0,41147:10.0,104644:10.0,25525:10.0,22037:10.0,2147:10.0,5487:10.0,23633:10.0,58952:10.0,6588:10.0,1574:10.0,27551:10.0,28094:10.0,32352:10.0,75631:10.0,34225:10.0,26054:10.0,123526:10.0,17694:10.0,75064:10.0,63923:10.0,61692:10.0,46651:10.0,116268:10.0,7647:10.0,55545:10.0,3741:10.0,96193:10.0,6515:10.0,18761:10.0,59977:10.0,69974:10.0,9259:10.0,10929:10.0,7029:10.0,51030:10.0,40438:10.0,890:10.0,63200:10.0,63918:10.0,118028:10.0,4773:10.0,22038:10.0,36519:10.0,35658:10.0,7529:10.0,73809:10.0,13648:10.0,150107:10.0,62100:10.0,19764:10.0,106653:10.0,1924:10.0,34229:10.0,40903:10.0,8594:10.0,67074:10.0,3016:10.0,169548:10.0,22496:10.0,17482:10.0,50340:10.0,54235:10.0,20814:10.0,8002:10.0,169579:10.0,59790:10.0,17433:10.0,20771:10.0,31910:10.0,4057:10.0,17980:10.0,15192:10.0,3731:10.0,10726:10.0,23536:10.0,1806:10.0,12944:10.0,30209:10.0,18511:10.0,45246:10.0,11824:10.0] 2110 [12808:10.0,1420:10.0,156414:10.0,1790:10.0,156640:10.0,16145:10.0,173218:10.0,83540:10.0,47891:10.0,3883:10.0,173214:10.0,48445:10.0,4990:10.0,188249:10.0,173208:10.0,5900:10.0,6106:10.0,45652:10.0,161031:10.0,2236:10.0,2238:10.0,8076:10.0,44530:10.0,163347:10.0,10172:10.0,139768:10.0,11329:10.0,73483:10.0,8313:10.0,73479:10.0,12783:10.0,99038:10.0,83500:10.0,169274:10.0,21668:10.0,3840:10.0,16758:10.0,173158:10.0,73462:10.0,183190:10.0,10519:10.0,22772:10.0,83483:10.0,62316:10.0,53403:10.0,19520:10.0,94062:10.0,53400:10.0,173163:10.0,110768:10.0,53396:10.0,91271:10.0,53394:10.0,48378:10.0,53386:10.0,36886:10.0,15019:10.0,25229:10.0,23859:10.0,180745:10.0,26665:10.0,2687:10.0,57825:10.0,3238:10.0,2678:10.0,17715:10.0,2675:10.0,148254:10.0,108499:10.0,153610:10.0,95121:10.0,2655:10.0,51110:10.0,5433:10.0,38854:10.0,73378:10.0,181986:10.0,58329:10.0,65012:10.0,43999:10.0,198828:10.0,44459:10.0,96756:10.0,7076:10.0,46534:10.0,266078:10.0,6514:10.0,173264:10.0,37146:10.0,4836:10.0,932:10.0,58859:10.0,60526:10.0,95058:10.0,60523:10.0,13734:10.0,58293:10.0,16786:10.0,1473:10.0,150745:10.0] 2358 [25289:8.75465,85637:8.699153,17614:8.697598,72358:8.437523,10821:8.0,26982:8.0,10400:8.0,151645:8.0,1355:8.0,4505:8.0,13272:8.0,96976:8.0,4500:8.0,654:8.0,6407:8.0,17912:8.0,92573:8.0,16669:8.0,16668:8.0,16666:8.0,2690:8.0,770:8.0,112560:8.0,19191:8.0,4188:8.0,4506:8.0,1432:8.0,14856:8.0,5401:8.0,115955:8.0,2103:8.0,7020:8.0,19074:8.0,53868:8.0,30166:8.0,4501:8.0,176614:8.0,20432:8.0,13158:8.0,9468:8.0,21241:8.0]
これは、254番のユーザに対して92522番のアイテムの推定スコアが9.630823である…という結果を表しています。本エントリでは起動オプションで--numRecommendations 100とした為、各ユーザに対するお薦めアイテムが、推定スコアの高い順に最大100件挙げられています。但し、アルゴリズムの都合上、100件に満たない件数しか算出できていないユーザもあることに注意してください。
後片付け
以上で、EMRの試行は完了となりますが、意図しないAWSの課金が無いよう、最後に後片付けをしておきましょう。
ManagementConsoleで、意図せずjobflowが動いていないことを確認。ステートが「COMPLETED」「TERMINATED」「FAILED」の何れかであれば、問題ありません。万一、意図しないjobflowが動いていた場合は、そのjobflowを選択し、Terminateボタンを押下して、jobflowを終了させてください。
EC2のタブで、意図せずインスタンスが起動状態になっていないことを確認。ステートが「terminated」であれば問題ありません。万一、意図しないインスタンスが動いていた場合は、そのインスタンスを選択し、「Instance Actions」ボタンの中から「Terminate」を選択して、インスタンスを終了させてください。
S3のタブで、S3にアップロードしたファイル、EMRにより生成された結果やログなど、全てのファイルを削除しましょう。バケットを消すのが最も確実です。
まとめ
本エントリでは、Mahoutのレコメンデーションを例にEMRを利用してみましたが、クラスタリング・分類等、他のMahout機械学習についても応用可能です。
また、実際にMahoutをガリガリ使っていくのであれば、この本は必携ですね。買ってね!!(広告)
実は私、この本の付録(日本語版書き下ろし)に、本エントリとよく似たチュートリアルを書いたのですが、あっという間に情報が古くなってしまいました。AWS恐るべしであります。というわけで、本エントリはこの本の追補としてもご活用ください。